Strip huge piles of cruft from the connection infrastructure. We now actually
authoremellor@leeni.uk.xensource.com <emellor@leeni.uk.xensource.com>
Thu, 8 Dec 2005 15:04:31 +0000 (15:04 +0000)
committeremellor@leeni.uk.xensource.com <emellor@leeni.uk.xensource.com>
Thu, 8 Dec 2005 15:04:31 +0000 (15:04 +0000)
block inside accept rather than using select to poll and then calling accept
regardless of the outcome of the select call, and then failing because the
socket is non-blocking.

SocketClientConnection, SocketConnector, TCPClientConnection, TCPConnector,
connectTCP, UnixClientConnection, UnixConnector, connectUnix have gone.

loseConnection and stopListening and closeSocket (where they are needed) are
now called close.  startListening is now called listen.

Closes bug #379.

Relieves a weight from the shoulders of the universe.

Signed-off-by: Ewan Mellor <ewan@xensource.com>
tools/python/xen/web/connection.py
tools/python/xen/web/protocol.py
tools/python/xen/web/tcp.py
tools/python/xen/web/unix.py
tools/python/xen/xend/server/relocate.py

index 89de272db476f6fd62e67af9df4603b738b1b78c..342e6a6f5979c5f67b3920c3a1ea73280a08497b 100644 (file)
@@ -30,11 +30,8 @@ specifying what kind of socket they are. There are subclasses
 for TCP and unix-domain sockets (see tcp.py and unix.py).
 """
 
-"""We make sockets non-blocking so that operations like accept()
-don't block. We also select on a timeout. Otherwise we have no way
-of getting the threads to shutdown.
-"""
-SELECT_TIMEOUT = 2.0
+BUFFER_SIZE = 1024
+
 
 class SocketServerConnection:
     """An accepted connection to a server.
@@ -45,73 +42,35 @@ class SocketServerConnection:
         self.protocol = protocol
         self.addr = addr
         self.server = server
-        self.buffer_n = 1024
-        self.thread = None
         self.protocol.setTransport(self)
 
+
     def run(self):
-        self.thread = threading.Thread(target=self.main)
-        self.thread.start()
+        threading.Thread(target=self.main).start()
 
-    def main(self):
-        while True:
-            if not self.thread: break
-            if self.select(): break
-            if not self.thread: break
-            data = self.read()
-            if data is None: continue
-            if data is True: break
-            if self.dataReceived(data): break
 
-    def select(self):
-        try:
-            select.select([self.sock], [], [], SELECT_TIMEOUT)
-            return False
-        except socket.error, ex:
-            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
-                return False
-            else:
-                self.loseConnection(ex)
-                return True
-
-    def read(self):
+    def main(self):
         try:
-            data = self.sock.recv(self.buffer_n)
-            if data == '':
-                self.loseConnection()
-                return True
-            return data
-        except socket.error, ex:
-            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
-                return None
-            else:
-                self.loseConnection(ex)
-                return True
+            while True:
+                try:
+                    data = self.sock.recv(BUFFER_SIZE)
+                    if data == '':
+                        break
+                    if self.protocol.dataReceived(data):
+                        break
+                except socket.error, ex:
+                    if ex.args[0] not in (EWOULDBLOCK, EAGAIN, EINTR):
+                        break
+        finally:
+            try:
+                self.sock.close()
+            except:
+                pass
 
-    def dataReceived(self, data):
-        try:
-            self.protocol.dataReceived(data)
-        except SystemExit:
-            raise
-        except Exception, ex:
-            self.loseConnection(ex)
-            return True
-        return False
 
     def write(self, data):
         self.sock.send(data)
 
-    def loseConnection(self, reason=None):
-        self.thread = None
-        self.closeSocket(reason)
-
-    def closeSocket(self, reason):
-        try:
-            self.sock.close()
-        except SystemExit:
-            raise
-        except:
-            pass
 
 class SocketListener:
     """A server socket, running listen in a thread.
@@ -126,192 +85,44 @@ class SocketListener:
         self.backlog = backlog
         self.thread = None
 
+
     def createSocket(self):
         raise NotImplementedError()
 
+
     def setCloExec(self):
         fcntl.fcntl(self.sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
 
+
     def acceptConnection(self, sock, protocol, addr):
         return SocketServerConnection(sock, protocol, addr, self)
 
-    def startListening(self):
+
+    def listen(self):
         if self.sock or self.thread:
             raise IOError("already listening")
         self.sock = self.createSocket()
-        self.sock.setblocking(0)
         self.sock.listen(self.backlog)
         self.run()
 
-    def stopListening(self, reason=None):
-        self.loseConnection(reason)
 
     def run(self):
         self.thread = threading.Thread(target=self.main)
         self.thread.start()
 
-    def main(self):
-        while True:
-            if not self.thread: break
-            if self.select(): break
-            if self.accept(): break
-
-    def select(self):
-        try:
-            select.select([self.sock], [], [], SELECT_TIMEOUT)
-            return False
-        except socket.error, ex:
-            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
-                return False
-            else:
-                self.loseConnection(ex)
-                return True
-
-    def accept(self):
-        try:
-            (sock, addr) = self.sock.accept()
-            sock.setblocking(0)
-            return self.accepted(sock, addr)
-        except socket.error, ex:
-            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
-                return False
-            else:
-                self.loseConnection(ex)
-                return True
-
-    def accepted(self, sock, addr):
-        self.acceptConnection(sock, self.protocol_class(), addr).run()
-        return False
-
-    def loseConnection(self, reason=None):
-        self.thread = None
-        self.closeSocket(reason)
-
-    def closeSocket(self, reason):
-        try:
-            self.sock.close()
-        except SystemExit:
-            raise
-        except Exception, ex:
-            pass
-
-
-class SocketClientConnection:
-    """A connection to a server from a client.
-
-    Call connectionMade() on the protocol in a thread when connected.
-    It is completely up to the protocol what to do.
-    """
-
-    def __init__(self, connector):
-        self.addr = None
-        self.connector = connector
-        self.buffer_n = 1024
-
-    def createSocket (self):
-        raise NotImplementedError()
-
-    def write(self, data):
-        if self.sock:
-            return self.sock.send(data)
-        else:
-            return 0
-
-    def connect(self, timeout):
-        #todo: run a timer to cancel on timeout?
-        try:
-            sock = self.createSocket()
-            sock.connect(self.addr)
-            self.sock = sock
-            self.protocol = self.connector.protocol_class()
-            self.protocol.setTransport(self)
-        except SystemExit:
-            raise
-        except Exception, ex:
-            self.connector.connectionFailed(ex)
-            return False
-
-        self.thread = threading.Thread(target=self.main)
-        #self.thread.setDaemon(True)
-        self.thread.start()
-        return True
 
     def main(self):
         try:
-            # Call the protocol in a thread.
-            # Up to it what to do.
-            self.protocol.connectionMade(self.addr)
-        except SystemExit:
-            raise
-        except Exception, ex:
-            self.loseConnection(ex)
-
-    def mainLoop(self):
-        # Something a protocol could call.
-        while True:
-            if not self.thread: break
-            if self.select(): break
-            if not self.thread: break
-            data = self.read()
-            if data is None: continue
-            if data is True: break
-            if self.dataReceived(data): break
-
-    def select(self):
-        try:
-            select.select([self.sock], [], [], SELECT_TIMEOUT)
-            return False
-        except socket.error, ex:
-            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
-                return False
-            else:
-                self.loseConnection(ex)
-                return True
-
-    def read(self):
-        try:
-            data = self.sock.recv(self.buffer_n)
-            return data
-        except socket.error, ex:
-            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
-                return None
-            else:
-                self.loseConnection(ex)
-                return True
-        
-    def dataReceived(self, data):
-        if not self.protocol:
-            return True
-        try:
-            self.protocol.dataReceived(data)
-        except SystemExit:
-            raise
-        except Exception, ex:
-            self.loseConnection(ex)
-            return True
-        return False
-
-    def loseConnection(self, reason=None):
-        self.thread = None
-        self.closeSocket(reason)
-
-    def closeSocket(self, reason):
-        try:
-            if self.sock:
+            while True:
+                try:
+                    (sock, addr) = self.sock.accept()
+                    self.acceptConnection(sock, self.protocol_class(),
+                                          addr).run()
+                except socket.error, ex:
+                    if ex.args[0] not in (EWOULDBLOCK, EAGAIN, EINTR):
+                        break
+        finally:
+            try:
                 self.sock.close()
-        except SystemExit:
-            raise
-        except:
-            pass
-
-class SocketConnector:
-    """A client socket. Connects to a server and runs the client protocol
-    in a thread.
-    """
-
-    def __init__(self, protocol_class):
-        self.protocol_class = protocol_class
-        self.transport = None
-
-    def connect(self):
-        pass
+            except:
+                pass
index 54f44b0628b2d1a97037f128e36df6bae155df19..603973a3fd5f7f4fc6a3f83c36bc2a5cef2784f4 100644 (file)
@@ -25,7 +25,7 @@ class Protocol:
         self.transport = transport
 
     def dataReceived(self, data):
-        print 'Protocol>dataReceived>'
+        raise NotImplementedError()
 
     def write(self, data):
         if self.transport:
index 674bf93f8c9b06418275c20b09827839802d1fbb..f3d67a7de4d4c0c9cc3416453a45b9ef1a31b4da 100644 (file)
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 #============================================================================
 # Copyright (C) 2005 Mike Wray <mike.wray@hp.com>
+# Copyright (C) 2005 XenSource Ltd.
 #============================================================================
 
-import sys
+
 import socket
-import types
 import time
 import errno
 
 from connection import *
-from protocol import *
+
 
 class TCPListener(SocketListener):
 
@@ -52,48 +52,8 @@ class TCPListener(SocketListener):
     def acceptConnection(self, sock, protocol, addr):
         return SocketServerConnection(sock, protocol, addr, self)
 
-class TCPClientConnection(SocketClientConnection):
-
-    def __init__(self, host, port, bindAddress, connector):
-        SocketClientConnection.__init__(self, connector)
-        self.addr = (host, port)
-        self.bindAddress = bindAddress
-
-    def createSocket(self):
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        if self.bindAddress is not None:
-            sock.bind(self.bindAddress)
-        return sock
-    
-class TCPConnector(SocketConnector):
-
-    def __init__(self, host, port, protocol, timeout=None, bindAddress=None):
-        SocketConnector.__init__(self, protocol)
-        self.host = host
-        self.port = self.servicePort(port)
-        self.bindAddress = bindAddress
-        self.timeout = timeout
-
-    def servicePort(self, port):
-        if isinstance(port, types.StringTypes):
-            try:
-                port = socket.getservbyname(port, 'tcp')
-            except socket.error, ex:
-                raise IOError("unknown service: " + ex)
-        return port
-
-    def connect(self):
-        self.transport = TCPClientConnection(
-            self.host, self.port, self.bindAddress, self)
-        self.transport.connect(self.timeout)
 
 def listenTCP(port, protocol, interface='', backlog=None):
     l = TCPListener(port, protocol, interface=interface, backlog=backlog)
-    l.startListening()
-    return l
-
-def connectTCP(host, port, protocol, timeout=None, bindAddress=None):
-    c = TCPConnector(host, port, protocol, timeout=timeout,
-                     bindAddress=bindAddress)
-    c.connect()
-    return c
+    l.listen()
+    l.setCloExec()
index 2d03b092609817923ab5396c360edcb383855dde..64ae2fcf1bf2b973d20faa7621da9c0f2e9ffae4 100644 (file)
 # Copyright (C) 2005 XenSource Ltd.
 #============================================================================
 
-import sys
+
 import socket
 import os
 import os.path
 
 from connection import *
-from protocol import *
+
 
 class UnixListener(SocketListener):
 
@@ -48,33 +48,6 @@ class UnixListener(SocketListener):
     def acceptConnection(self, sock, protocol, addr):
         return SocketServerConnection(sock, protocol, self.path, self)
 
-class UnixClientConnection(SocketClientConnection):
-
-    def __init__(self, addr, connector):
-        SocketClientConnection.__init__(self, connector)
-        self.addr = addr
-        
-    def createSocket(self):
-        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-        return sock
-    
-class UnixConnector(SocketConnector):
-
-    def __init__(self, path, protocol, timeout=None):
-        SocketConnector.__init__(self, protocol)
-        self.addr = path
-        self.timeout = timeout
-
-    def connect(self):
-        self.transport = UnixClientConnection(self.addr, self)
-        self.transport.connect(self.timeout)
 
 def listenUNIX(path, protocol, backlog=None):
-    l = UnixListener(path, protocol, backlog=backlog)
-    l.startListening()
-    return l
-
-def connectUNIX(path, protocol, timeout=None):
-    c = UnixConnector(path, protocol, timeout=timeout)
-    c.connect()
-    return c
+    UnixListener(path, protocol, backlog=backlog).listen()
index d4480f242ed31cba03302949952d1942d892c354..c20d57715625d96952533475fc6245ac5dcad2ac 100644 (file)
@@ -44,15 +44,15 @@ class RelocationProtocol(protocol.Protocol):
                 res = self.dispatch(val)
                 self.send_result(res)
             if self.parser.at_eof():
-                self.loseConnection()
+                self.close()
         except SystemExit:
             raise
         except:
             self.send_error()
 
-    def loseConnection(self):
+    def close(self):
         if self.transport:
-            self.transport.loseConnection()
+            self.transport.close()
 
     def send_reply(self, sxpr):
         io = StringIO.StringIO()
@@ -100,15 +100,13 @@ class RelocationProtocol(protocol.Protocol):
         return l
 
     def op_quit(self, _1, _2):
-        self.loseConnection()
+        self.close()
 
     def op_receive(self, name, _):
         if self.transport:
             self.send_reply(["ready", name])
-            self.transport.sock.setblocking(1)
             XendDomain.instance().domain_restore_fd(
                 self.transport.sock.fileno())
-            self.transport.sock.setblocking(0)
         else:
             log.error(name + ": no transport")
             raise XendError(name + ": no transport")
@@ -122,5 +120,4 @@ def listenRelocation():
     if xroot.get_xend_relocation_server():
         port = xroot.get_xend_relocation_port()
         interface = xroot.get_xend_relocation_address()
-        l = tcp.listenTCP(port, RelocationProtocol, interface=interface)
-        l.setCloExec()
+        tcp.listenTCP(port, RelocationProtocol, interface=interface)